package registry

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"sync"
	"time"
)

// 注册服务的 Web Service

const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"

type registry struct {
	registrations []Registration // 已经注册的服务，动态变化，可能会被并发访问
	mutex         *sync.RWMutex  // 保证在并发访问的时候，registrations 是线程安全的
}

// 注册新的服务
func (r *registry) add(reg Registration) error {
	r.mutex.Lock()
	// 添加服务
	r.registrations = append(r.registrations, reg)
	r.mutex.Unlock()

	// 针对刚要注册的服务，发送一个请求，把它所依赖的服务请求过来
	err := r.sendRequiredServices(reg)

	r.notify(patch{
		Added: []patchEntry{
			{
				Name: reg.ServiceName,
				URL:  reg.ServiceURL,
			},
		},
	})
	return err
}

// 当一个服务（如 log service）出现的时候
// 想要通知需要该服务的服务（如 grading service）

// 从目前已经注册的服务中，检查它们的依赖项在 patch 中是否出现
// 如果在 patch 中存在依赖项
// 如果是新增的，就通知，这个服务出现了
// 如果是移除的，那么就通知需要依赖项的服务，这个依赖项被移除了
func (r registry) notify(fullPatch patch) {
	r.mutex.RLock()
	defer r.mutex.RUnlock()

	// patch 每次更新会有很多条目
	// 这些条目我们不必依次发送通知，我们可以使用 go routine 来并发地发送通知

	// 针对已经注册的服务进行循环
	for _, reg := range r.registrations {
		// 把每个服务传到 go routine 中
		go func(reg Registration) {
			// 针对每一个已经注册的服务，对它所依赖的服务进行循环
			for _, reqService := range reg.RequiredServices {
				p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
				// 标志位，这个值转为 true 时，表示有需要更新的地方，
				sendUpdate := false

				// 对传进来的 fullPatch 进行循环，看它添加或移除了哪些服务
				// 如果它添加的服务正好是某一个服务的依赖项
				// 那么就把这个服务添加到 p 中，然后设置标志位

				for _, added := range fullPatch.Added {
					if added.Name == reqService {
						p.Added = append(p.Added, added)
						sendUpdate = true
					}
				}
				for _, removed := range fullPatch.Removed {
					if removed.Name == reqService {
						p.Removed = append(p.Removed, removed)
						sendUpdate = true
					}
				}

				// 需要发送通知
				if sendUpdate {
					// 把更新发送到对应的服务
					err := r.sendPatch(p, reg.ServiceUpdateURL)
					if err != nil {
						log.Println(err)
						return
					}
				}
			}
		}(reg)
	}
}

func (r registry) sendRequiredServices(reg Registration) error {
	r.mutex.RLock()
	defer r.mutex.RUnlock()

	// 从已经注册的服务中，找到当前要注册的服务所依赖的服务
	// 如果能找到，就加到里面去
	var p patch
	for _, serviceReg := range r.registrations {
		for _, reqService := range reg.RequiredServices {
			if serviceReg.ServiceName == reqService {
				p.Added = append(p.Added, patchEntry{
					Name: serviceReg.ServiceName,
					URL:  serviceReg.ServiceURL,
				})
			}
		}
	}

	err := r.sendPatch(p, reg.ServiceUpdateURL)
	if err != nil {
		return err
	}
	return nil
}

func (r registry) sendPatch(p patch, url string) error {
	// 将 patch 转换为 json
	d, err := json.Marshal(p)
	if err != nil {
		return err
	}

	// 发送 POST 请求
	// 创建实现 io.Reader 接口的 Buffer，将 json 格式的 patch 传入
	_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
	if err != nil {
		return err
	}
	return nil
}

// 取消注册
func (r *registry) remove(url string) error {
	// 遍历所有服务
	for i := range reg.registrations {
		// 根据 url 找到要取消注册的服务
		if reg.registrations[i].ServiceURL == url {
			r.notify(patch{
				Removed: []patchEntry{
					{
						Name: r.registrations[i].ServiceName,
						URL:  r.registrations[i].ServiceURL,
					},
				},
			})

			r.mutex.Lock()
			reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...) // 切片拼接
			r.mutex.Unlock()
			return nil
		}
	}
	return fmt.Errorf("service at URL %s not found", url)
}

// 监控所有服务的健康状态
// 每隔一段时间，向所有注册的服务发送一个请求
// 根据请求的结果，我们可以判断这些服务是否处于健康的状态
func (r *registry) heartbeat(freq time.Duration) {
	for {
		// 并发地向已注册的服务发送健康检查的请求
		var wg sync.WaitGroup
		// 循环已经注册的服务
		for _, reg := range r.registrations {
			wg.Add(1)
			go func(reg Registration) {
				defer wg.Done()
				success := true

				// 重试机制，每次间隔 1 秒，如果成功，则不用重试
				for attempts := 0; attempts < 3; attempts++ {
					// 向心跳接收 url 发送 GET 请求
					res, err := http.Get(reg.HeartbeatURL)
					if err != nil {
						log.Println(err)
					} else if res.StatusCode == http.StatusOK {
						log.Printf("Heartbeat check passed for %v", reg.ServiceName)

						// success 为 false 表示这个服务可能发生过移除
						if !success {
							r.add(reg)
						}
						break
					}
					log.Printf("Heartbeat check failed for %v", reg.ServiceName)
					if success {
						success = false
						r.remove(reg.ServiceURL)
					}
					time.Sleep(1 * time.Second)
				}
			}(reg)

			wg.Wait()
			time.Sleep(freq)
		}
	}
}

// 表示只会运行一次
var once sync.Once

func SetupRegistryService() {
	once.Do(func() {
		go reg.heartbeat(3 * time.Second)
	})
}

// 存放已经注册的服务
var reg = registry{
	registrations: make([]Registration, 0),
	mutex:         new(sync.RWMutex),
}

type RegistryService struct{}

// 实现 Handle 接口，需要实现的方法：ServeHTTP(ResponseWriter, *Request)
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	log.Println("Request received")

	switch r.Method {
	case http.MethodPost:
		// 对请求体进行解码
		dec := json.NewDecoder(r.Body)
		var r Registration
		err := dec.Decode(&r)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusBadRequest)
			return
		}
		log.Printf("Adding service: %v with URL: %s\n", r.ServiceName,
			r.ServiceURL)

		// 将 r 添加进已注册的服务集合中
		err = reg.add(r)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusBadRequest)
			return
		}
	case http.MethodDelete:
		// 此时请求体中的内容就是 url
		payload, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		// 将 []byte 转化成 string
		url := string(payload)
		log.Printf("Removing service at URL: %s", url)

		// 根据 url 将指定的服务从集合中移除
		err = reg.remove(url)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}
}
